Skip to content

Conversation

@Shubhrakanti
Copy link
Contributor

@Shubhrakanti Shubhrakanti commented Sep 9, 2025

The NVIDIA Riva SDK provides synchronous APIs for both speech-to-text and text-to-speech operations. To prevent blocking the event loop:

  • STT: Runs _recognition_worker in a separate thread to handle synchronous streaming_response_generator calls, using call_soon_threadsafe to emit speech events back to the main event loop
  • TTS: Runs _synthesize_worker in a separate thread for synchronous synthesize_online calls, using call_soon_threadsafe to push generated audio chunks to the output emitter

Try it out with uv run examples/voice_agents/nvidia_test.py console


try:
await asyncio.gather(*tasks)
await asyncio.to_thread(synthesize_thread.join, timeout=5.0)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I just got a chance to test this, but this timeout is causing TTS to be cut off before finishing long sentences (>5s) Is this working properly for you? I set it to 20 and everything works as expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove the timeout all together? The TTS can be arbitrarily long.


self._auth = riva.client.Auth(
uri=stt._opts.server,
use_ssl=True,
Copy link

@riqiang-dp riqiang-dp Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have the option to set it to False (for local deployment and testing)

if not self._tts_service:
auth = riva.client.Auth(
uri=self._opts.server,
use_ssl=True,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

@Shubhrakanti
Copy link
Contributor Author

@riqiang-dp let me know if there's anything else you'd like changed!

uri=stt._opts.server,
use_ssl=stt._opts.use_ssl,
metadata_args=[
["authorization", f"Bearer {stt.nvidia_api_key}"],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's local host we can just omit this. (no need for api key)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Shubhrakanti Shubhrakanti requested review from theomonnom and removed request for riqiang-dp October 23, 2025 20:32
Copy link
Contributor Author

@Shubhrakanti Shubhrakanti left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to think about these things. Let me know if you have any thoughts @theomonnom .

I should also add a comment on why use a separate thread - since it's different from the usual async implementation in plugins.

uri=stt._opts.server,
use_ssl=stt._opts.use_ssl,
metadata_args=[
["authorization", f"Bearer {stt.nvidia_api_key}"],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 141 to 142
if self._thread_exception:
raise self._thread_exception
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a good idea? I was hoping this would trigger the fall back adapter.

Comment on lines 206 to 207
except queue.Empty:
continue
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should add a comment on why this is here.

service = self._tts._ensure_session()
while not self._shutdown_event.is_set():
try:
token = self._token_q.get(timeout=0.1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any performance issues here? Is there a better way to do this?

Comment on lines 108 to 109
logger.info("Available TTS voices:")
logger.info(json.dumps(tts_models, indent=4))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug logs

@Shubhrakanti
Copy link
Contributor Author

@theomonnom I've updated the threading model and cleaned some some useless code should be good to go.

@Shubhrakanti Shubhrakanti merged commit 138c8b9 into main Nov 4, 2025
18 checks passed
@Shubhrakanti Shubhrakanti deleted the shubhra/nvidia-plugins branch November 4, 2025 17:49
Comment on lines 19 to 126
from livekit.plugins import silero
from livekit.plugins import cartesia, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel

# uncomment to enable Krisp background voice/noise cancellation
# from livekit.plugins import noise_cancellation

logger = logging.getLogger("basic-agent")

load_dotenv()


class MyAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions="Your name is Kelly. You would interact with users via voice."
"with that in mind keep your responses concise and to the point."
"do not use emojis, asterisks, markdown, or other special characters in your responses."
"You are curious and friendly, and have a sense of humor."
"you will speak english to the user",
)

async def on_enter(self):
# when the agent is added to the session, it'll generate a reply
# according to its instructions
self.session.generate_reply()

# all functions annotated with @function_tool will be passed to the LLM when this
# agent is active
@function_tool
async def lookup_weather(
self, context: RunContext, location: str, latitude: str, longitude: str
):
"""Called when the user asks for weather related information.
Ensure the user's location (city or region) is provided.
When given a location, please estimate the latitude and longitude of the location and
do not ask the user for them.
Args:
location: The location they are asking for
latitude: The latitude of the location, do not ask user for it
longitude: The longitude of the location, do not ask user for it
"""

logger.info(f"Looking up weather for {location}")

return "sunny with a temperature of 70 degrees."


def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()


async def entrypoint(ctx: JobContext):
# each log entry will include these fields
ctx.log_context_fields = {
"room": ctx.room.name,
}
session = AgentSession(
# Speech-to-text (STT) is your agent's ears, turning the user's speech into text that the LLM can understand
# See all available models at https://docs.livekit.io/agents/models/stt/
stt="assemblyai/universal-streaming:en",
stt=cartesia.STT(),
# A Large Language Model (LLM) is your agent's brain, processing user input and generating a response
# See all available models at https://docs.livekit.io/agents/models/llm/
llm="openai/gpt-4.1-mini",
llm=openai.LLM(),
# Text-to-speech (TTS) is your agent's voice, turning the LLM's text into speech that the user can hear
# See all available models as well as voice selections at https://docs.livekit.io/agents/models/tts/
tts="cartesia/sonic-2:9626c31c-bec5-4cca-baa8-f8ba9e84c8bc",
# VAD and turn detection are used to determine when the user is speaking and when the agent should respond
# See more at https://docs.livekit.io/agents/build/turns
turn_detection=MultilingualModel(),
vad=ctx.proc.userdata["vad"],
# allow the LLM to generate a response while waiting for the end of turn
# See more at https://docs.livekit.io/agents/build/audio/#preemptive-generation
preemptive_generation=True,
# sometimes background noise could interrupt the agent session, these are considered false positive interruptions
# when it's detected, you may resume the agent's speech
resume_false_interruption=True,
false_interruption_timeout=1.0,
)

# log metrics as they are emitted, and total usage after session is over
usage_collector = metrics.UsageCollector()

@session.on("metrics_collected")
def _on_metrics_collected(ev: MetricsCollectedEvent):
metrics.log_metrics(ev.metrics)
usage_collector.collect(ev.metrics)

async def log_usage():
summary = usage_collector.get_summary()
logger.info(f"Usage: {summary}")

# shutdown callbacks are triggered when the session is over
ctx.add_shutdown_callback(log_usage)

await session.start(
agent=MyAgent(),
room=ctx.room,
room_input_options=RoomInputOptions(
# uncomment to enable Krisp BVC noise cancellation
# noise_cancellation=noise_cancellation.BVC(),
),
room_output_options=RoomOutputOptions(transcription_enabled=True),
)

await session.say("hello world")
session.shutdown()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidental commit. Will undo with #3797

Shubhrakanti added a commit that referenced this pull request Nov 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants